package com.isyscore.os.metadata.kettle.step;

import com.isyscore.os.metadata.enums.KettleDataFlowNodeType;
import com.isyscore.os.metadata.kettle.base.FlowConfig;
import com.isyscore.os.metadata.kettle.base.Step;
import com.isyscore.os.metadata.kettle.vis.FieldTuple;
import com.isyscore.os.metadata.kettle.vis.InsertUpdateNode;
import com.isyscore.os.metadata.kettle.vis.VisNode;
import org.pentaho.di.core.database.DatabaseMeta;
import org.pentaho.di.core.plugins.PluginInterface;
import org.pentaho.di.core.plugins.PluginRegistry;
import org.pentaho.di.core.plugins.StepPluginType;
import org.pentaho.di.trans.TransMeta;
import org.pentaho.di.trans.step.StepMeta;
import org.pentaho.di.trans.step.StepMetaInterface;
import org.pentaho.di.trans.steps.insertupdate.InsertUpdateMeta;

import java.util.List;


public class InsertUpdateStep implements Step {
    @Override
    public StepMeta decode(VisNode step, List<DatabaseMeta> databases, TransMeta transMeta, FlowConfig transGraph) throws Exception {
        InsertUpdateNode node = (InsertUpdateNode)step;

        PluginRegistry registry = PluginRegistry.getInstance();
        PluginInterface sp = registry.findPluginWithId( StepPluginType.class, KettleDataFlowNodeType.InsertUpdate.name() );
        StepMetaInterface stepMetaInterface = (StepMetaInterface) registry.loadClass( sp );

        InsertUpdateMeta insertUpdateMeta = (InsertUpdateMeta) stepMetaInterface;
        DatabaseMeta database = DatabaseMeta.findDatabase(databases, String.valueOf(node.getDataSourceId()));
        insertUpdateMeta.setDatabaseMeta(database);
        if(!database.getPluginId().equals("MYSQL")){
            insertUpdateMeta.setSchemaName(step.getDatabaseName());
        }
        insertUpdateMeta.setTableName(node.getTableName());

        String[] keyLookup = new String[node.getKeyFieldMapping().size()];
        String[] keyCondition = new String[node.getKeyFieldMapping().size()];
        String[] keyStream1 = new String[node.getKeyFieldMapping().size()];
        String[] keyStream2 = new String[node.getKeyFieldMapping().size()];

        for (int i = 0; i < node.getKeyFieldMapping().size(); i++) {
            FieldTuple fieldTuple = node.getKeyFieldMapping().get(i);
            keyLookup[i] =fieldTuple.getToField();
            keyCondition[i] = "=";
            keyStream1[i] = fieldTuple.getFromField();
        }

        //构造对比字段
        insertUpdateMeta.setKeyLookup(keyLookup);
        insertUpdateMeta.setKeyCondition(keyCondition);
        insertUpdateMeta.setKeyStream(keyStream1);
        insertUpdateMeta.setKeyStream2(keyStream2);

        insertUpdateMeta.setKeyLookup(keyLookup);
        insertUpdateMeta.setKeyCondition(keyCondition);

        String[] updateLookup = new String[node.getUpdateFieldMapping().size()];
        String[] updateStream = new String[node.getUpdateFieldMapping().size()];
        Boolean[] update = new Boolean[node.getUpdateFieldMapping().size()];

        //构造更新字段
        for (int i = 0; i < node.getUpdateFieldMapping().size(); i++) {
            FieldTuple fieldTuple = node.getUpdateFieldMapping().get(i);
            updateLookup[i] = fieldTuple.getToField();
            updateStream[i] = fieldTuple.getFromField();
            update[i] = true;
        }
        insertUpdateMeta.setUpdateLookup(updateLookup);
        insertUpdateMeta.setUpdateStream(updateStream);
        insertUpdateMeta.setUpdate(update);

        StepMeta stepMeta = new StepMeta(KettleDataFlowNodeType.TableOutput.name(), step.getNodeName(), stepMetaInterface);
        stepMeta.setDraw(true);
        //指定指定并发度
        stepMeta.setCopiesString(String.valueOf(node.getCopies()==null?1:node.getCopies()));
//        stepMeta.setCopiesString("3");
        return stepMeta;
    }

    @Override
    public StepMeta after(VisNode step, List<DatabaseMeta> databases, TransMeta transMeta, FlowConfig transGraph) throws Exception {
        return null;
    }
}
